在這篇文章中,我們將探討如何在AWS Lambda上運行一個基於Node.js的 LINE Bot Handler,並解析其技術原理與最佳實踐。首先,我們將概述AWS Lambda的執行生命周期,並介紹官方定義的Lambda Function格式。隨後,我們會詳細檢視事件處理邏輯及Kafka Producer的程式結構。
以下Lambda Function用於處理 LINE bot Webhook,並將提取的訊息內容和回覆訊息所需的Token傳送到 Kafka。
一個 Lambda Function 必須遵循 AWS 官方規範的檔案和函數名稱。根據使用的程式語言,這些規範會有所不同。例如,對於Java 17,程式碼必須包含一個類別HandlerIntegerJava17,並在其中定義 handleRequest函數。
在本專案中,我們使用的是 Node.js,因此必須有一個名為index.js或index.mjs的檔案,並在其中定義一個handler function。其基本結構如下:
export const handler = async (event, context) => {
// 取得AWS Lambda event內容。
// event 是一個 JSON 字串,可以從中解析出 LINE Platform 傳送來的資訊。
const bodyContent = JSON.parse(body);
return context.logStreamName;
};
.env
.eslintrc.json
deploy.sh
env.example
index.js
package.json
utils/kafka/producer.js
@line/bot-sdk: 用於直接回覆LINE使用者的訊息。aws-xray-sdk: 此為將Node.js的程式部署到AWS Lambda上執行的必要依賴dotenv: 用於加載環境變數。kafkajs: 用於將使用者的訊息及Token寫入Kafka Message Queue,供其他程式做進一步的處理。eslint: 用於程式碼格式檢查。eslint-config-airbnb-base: 本專案使用Airbnb 的 ESLint規則。eslint-plugin-import: 用於支持 ES6 模塊的 ESLint 插件。在 .env 文件中包含了以下環境變數:
# 要連接的Kafka Host IP 或 Domain Name
KAFKA_HOST_IP=
# 要將訊息送到哪一個Kafka Topic
KAFKA_TOPIC=TEST-MESSAGE-TOPIC
# 要連接的Kafka Port, 預設為9092
KAFKA_PORT=
# 要連接的Client id
KAFKA_CLIENT_ID=
# 指定LINE BOT 的Channel Access Token
CHANNEL_ACCESS_TOKEN=
使用上一篇的deploy.sh腳本進行部署:
function=LinebotHandlerNode ./deploy.sh
index.jsevent.body的內容就是LINE Platform送過來的Webhook。const { produceMessage } = require('./utils/kafka/producer');
require('dotenv').config();
const kafkaHandler = async (event, context) => {
const kafkaTopic = process.env.KAFKA_TOPIC;
const { body } = event;
const bodyContent = JSON.parse(body);
const { events } = bodyContent;
const produceKafkaResults = [];
events.forEach((e) => {
if (e.type === 'message' && e.message !== undefined && e.message.type === 'text') {
const {
source: {
userId,
},
message: {
text,
},
timestamp,
replyToken,
} = e;
produceKafkaResults.push(produceMessage({
topic: kafkaTopic,
messages: [{ key: userId, value: text }],
}));
}
});
await Promise.all(produceKafkaResults);
return context.logStreamName;
};
// 簡單的handler Factory,當需要開發多個Handler function需要切換時,較為方便
const handlerFactory = (name) => {
const functionCode = {
kafka: kafkaHandler,
other: otherHandler,
};
return functionCode[name];
};
// 將Lambda Handler與主要邏輯分開,方便測試及抽換
exports.handler = async function (event, context) {
const handler = handlerFactory('kafka');
await handler(event, context);
};
utils/kafka/producer.jsproduceMessage: 這邊先從環境變數取得需要的Kafka Host, Port, 以及要指定的Topic等連線資訊。最後將指定的訊息送到對應的Topic。const {
Kafka, logLevel, CompressionTypes, Partitioners,
} = require('kafkajs');
require('dotenv').config();
const host = process.env.KAFKA_HOST_IP || 'localhost';
const port = process.env.KAFKA_PORT || 9092;
const clientId = process.env.KAFKA_CLIENT_ID || 'example-producer';
const kafka = new Kafka({
logLevel: logLevel.DEBUG,
brokers: [`${host}:${port}`],
clientId,
});
const producer = kafka.producer({
allowAutoTopicCreation: true,
createPartitioner: Partitioners.LegacyPartitioner,
});
const produceMessage = async ({ topic = 'NON-GIVEN-MESSAGE', messages = [] } = {}) => {
try {
await producer.connect();
await producer.send({
topic,
compression: CompressionTypes.GZIP,
messages,
});
} catch (e) {
console.error(`[example/producer] ${e.message}`, e);
} finally {
await producer.disconnect();
}
};
module.exports = {
produceMessage,
};
以下說明幾個官方提到的AWS Lambda最佳實踐:
console.log來記錄的日誌,將自動發送到 Amazon CloudWatch Logs,可以在AWS Lambda 主控台找到這個 Lambda Function所在的Log:
以上介紹的 Node.js Lambda Function 能夠解析從 LINE Platform 傳送過來的 Webhook,並從中提取使用者的訊息內容、Token 及其他相關資訊。
由於我們希望整個系統由各司其職的小型服務組成,在這裡,我們不直接對取得的資訊做後續處理,取而代之的是,透過Kafka這個Message Queue,將訊息傳遞給後端的 Bot Server,交給它來做後續的語意判斷及資料儲存等工作。
Kafka作為多個小型服務彼此溝通的橋樑,其運作原理以及如何設定,我們將在後面的章節詳細介紹。
https://docs.aws.amazon.com/lambda/latest/operatorguide/execution-environments.html
https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html#nodejs-best-practices